package com.atuguigu.flink.Day01.Example;

import com.atuguigu.flink.Day01.Singlesensor.SensorSource;
import com.atuguigu.flink.sensor.SendsorReading;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.RichFilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class FilterExample {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.setParallelism(1);

        DataStreamSource<SendsorReading> stream = env.addSource(new SensorSource());

        //TODO 1 将传感器id为sensor_1的过滤出来
        stream.filter(r->r.id.equals("sensor_1")).print();

        //TODO 2 输出了类型一样，所以泛型只有一个.因为它是过滤数据.
        stream.filter(
                new FilterFunction<SendsorReading>() {
                    @Override
                    public boolean filter(SendsorReading sendsorReading) throws Exception {
                        return sendsorReading.id.equals("sensor_1");
                    }
                }
        ).print();

        //TODO 3 实现
        stream.filter(
                new MyFilter()
        ).print();

        //TODO 4 flatMap 实现
        stream.flatMap(
                new FlatMapFunction<SendsorReading, SendsorReading>() {

                    @Override
                    public void flatMap(SendsorReading sendsorReading, Collector<SendsorReading> collector) throws Exception {
                        if (sendsorReading.id.equals("sensr_1")){
                            collector.collect(sendsorReading);
                        }
                    }
                }
        ).print();


        env.execute();

    }

    public static class MyFilter implements FilterFunction<SendsorReading>{

        @Override
        public boolean filter(SendsorReading sendsorReading) throws Exception {
            return sendsorReading.id.equals("sensor_1");
        }
    }
}
